package com.omuao.message.websocket.listener.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.omuao.message.websocket.config.MqttProperties;
import com.omuao.message.websocket.facade.ClientSessionManager;
import com.omuao.message.websocket.facade.MqttMessageListener;
import com.omuao.message.websocket.facade.WebSocketMessage;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
 * MQTT 消息监听器
 *
 * @author Administrator
 */
@Component
public class MqttMessageListenerImpl implements MqttMessageListener {

    @Autowired
    MqttProperties mqttProperties;

    @Autowired
    ClientSessionManager clientSessionManager;

    @Autowired
    @Qualifier("mqttClient")
    MqttClient mqttClient;

    @Autowired
    @Qualifier("mqttConnectOptions")
    MqttConnectOptions mqttConnectOptions;

    @Autowired
    ObjectMapper objectMapper;

    public static Logger logger = LoggerFactory.getLogger(MqttMessageListenerImpl.class);

    @Override
    public boolean sendMessage(WebSocketMessage result) {
        if (!mqttClient.isConnected()) {
            try {
                mqttClient.connect(mqttConnectOptions);
            } catch (MqttException e) {
                logger.warn("正在重新连接...");
                try {
                    mqttClient.reconnect();
                } catch (MqttException ex) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        if (!mqttClient.isConnected()) {
            return false;
        }

        MqttMessage mqttMessage = new MqttMessage();
        String content = null;
        try {
            content = objectMapper.writeValueAsString(result);
        } catch (JsonProcessingException e) {
            logger.error(e.getMessage(), e);
        }
        mqttMessage.setPayload(content.getBytes());
        mqttMessage.setQos(2);
        mqttMessage.setRetained(false);
        try {
            if (clientSessionManager.contains(result.getReceiverId())) {
                mqttClient.publish(mqttProperties.getTopicPrefix() + result.getReceiverId(), mqttMessage);
                return true;
            }
        } catch (MqttException e) {
            //发布失败
            logger.error(e.getMessage(), e);
        }
        return false;
    }
}
